package com.example.demo.stream;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.resource.ClassPathResource;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.demo.entity.EmployeePo;
import com.example.demo.entity.Event;
import com.example.demo.env.SpringContextHolder;
import com.example.demo.mapper.EmployeePoMapper;
import com.example.demo.stream.source.DbSource;
import com.example.demo.stream.source.RedisSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.List;

/**
 * flink 读取各种来源作为输入流
 * type :1 读取csv文件
 * type :2 读取redis
 * type :3 读取DB
 * type :4 读取hdfs
 */
public class DataStreamInput {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String type = parameterTool.get("type", "0");
        DataStream dataSource;
        switch (type) {
            case "1":
                dataSource = env.addSource(new FileSource());
                break;
            case "2":
                dataSource = env.addSource(new RedisSource<>("event", Event.class)).returns(Event.class);
                break;
            case "3":
                LambdaQueryWrapper<EmployeePo> queryWrapper = Wrappers.lambdaQuery();
                // oracle日期测试
                queryWrapper.eq(EmployeePo::getBirthday, DateUtil.parse("2022-04-28 17:46:13"));
                // mysql日期测试
//                queryWrapper.eq(EmployeePo::getBirthday, DateUtil.parse("2022-04-28 17:46:15"));
                EmployeePoMapper employeePoMapper = SpringContextHolder.getBean(EmployeePoMapper.class);
                // 自定义sql查询 参数拼接
                List<EmployeePo> employeeList = employeePoMapper.getEmployeeList(queryWrapper);
                System.out.println("查询结果==> " + employeeList);
                dataSource = env.addSource(new DbSource<>(EmployeePoMapper.class, queryWrapper)).returns(EmployeePo.class);
                break;
            default:
                throw new RuntimeException("数据源类型传入错误！");
        }
        // 文件路径
        dataSource.assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks());
        dataSource.print();
        env.execute();
    }


    /**
     * 读取csv文件作为数据源
     */
    public static class FileSource extends RichSourceFunction<Event> {
        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            File path = new ClassPathResource("clicks.csv").getFile();
            if (!path.exists())
                throw new RuntimeException("读取的文件不存在！");
            FileReader fileReader = new FileReader(path);
            // 读取CSV文件
            BufferedReader reader = new BufferedReader(fileReader);
            String line;
            Event event;
            // 循环读取每行
            while ((line = reader.readLine()) != null) {
                // 分隔字符串
                String[] row = line.split(" ", -1);
                if (!(row.length == 3))
                    throw new RuntimeException("文件内容格式不正确！");
                event = new Event();
                event.setUser(row[0]);
                event.setUrl(row[1]);
                event.setTimestamp(Long.parseLong(row[2]));
                ctx.collect(event);
            }
        }

        @Override
        public void cancel() {
            System.out.println("文件数据源关闭");
        }
    }

    /**
     * 读取redis数据作为数据源
     */
//    public static class RedisSource extends RichSourceFunction<Event> {
//        @Override
//        public void run(SourceContext<Event> ctx) {
////            FlinkJedisPoolConfig.Builder flinkJedisConfigBase = new FlinkJedisPoolConfig.Builder();
////            flinkJedisConfigBase.setHost("192.168.174.128");
////            flinkJedisConfigBase.setPort(6379);
//            // 获取jedis连接池
//            JedisPool jedisPool = new JedisPool(new GenericObjectPoolConfig<>(), "192.168.174.128", 6379);
//            // 获取jedis连接对象
//            Jedis resource = jedisPool.getResource();
//            String event = resource.get("event");
//            // json字符串转list对象
//            List<Event> list1 = JSONUtil.toList(event, Event.class);
//            list1.forEach(ctx::collect);
//            jedisPool.close();
//            resource.close();
//        }
//
//        @Override
//        public void cancel() {
//            System.out.println("redis数据源关闭");
//        }
//    }


}

